package com.mgy.example.service.common.lock;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.springframework.util.CollectionUtils;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * zookeeper实现分布式锁测试
 */
public class ZkMutexLock {

    /**
     * 是否获取锁
     */
    private boolean hasLock = false;
    /**
     * 前一个节点path
     */
    private String preNodePath;
    /**
     * 当前节点path
     */
    private String currentNodePath;

    /**
     * 锁的根节点
     */
    private String lockRoot;

    private WatcherRemoveCuratorFramework client;

    private static final String PREFIX = "lock_";

    public ZkMutexLock(CuratorFramework curatorFramework) {
        this(null, curatorFramework);
    }

    public ZkMutexLock(String lockRoot, CuratorFramework curatorFramework) {
        if (StringUtils.isBlank(lockRoot)) {
            this.lockRoot = "/lockRoot";
        } else {
            if (!lockRoot.startsWith("/")) {
                this.lockRoot = "/" + lockRoot;
            }
        }
        this.client = curatorFramework.newWatcherRemoveCuratorFramework();
    }


    private void validatePath(String path) throws Exception {
        if (StringUtils.isBlank(path)) {
            throw new Exception("节点名称不能为空");
        }
        if (!path.startsWith("/")) {
            throw new Exception("节点名称必须以/开头");
        }
    }

    public void tryLock(long timeout, TimeUnit timeUnit) throws Exception {
        String fullPath = lockRoot + "/" + PREFIX;
        hasLock = false;
        /**
         * CreateMode有四种选择
         * PERSISTENT：持久化目录节点，存储的数据不会丢失
         * PERSISTENT_SEQUENTIAL：顺序自动编号的持久化目录节点，存储的数据不会丢失，并且根据当前已近存在的节点数自动加 1，然后返回给客户端已经成功创建的目录节点名
         * EPHEMERAL：临时目录节点，一旦创建这个节点的客户端与服务器端口也就是session 超时，这种节点会被自动删除
         * EPHEMERAL_SEQUENTIAL：临时自动编号节点，一旦创建这个节点的客户端与服务器端口也就是session 超时，这种节点会被自动删除，并且根据当前已近存在的节点数自动加 1，然后返回给客户端已经成功创建的目录节点名
         *
         *ACL权限控制
         *CREATE:创建子节点的权限
         *READ：获取子节点数据和子节点列表的权限
         *WRITE：更新节点数据的权限
         *DELETE：删除节点数据的权限
         *ADMIN：设置节点ACL的权限
         */
        // /lockRoot/lock_0000000000
        currentNodePath = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(fullPath);
        while ((client.getState() == CuratorFrameworkState.STARTED) && !hasLock) {
            try {
                List<String> children = client.getChildren().forPath(lockRoot);
                if (CollectionUtils.isEmpty(children)) {
                    continue;
                }
                //按节点名称排序，当前节点如果是最小的则认为获取到锁
                Collections.sort(children);
                System.out.println("当前节点：" + currentNodePath + ",子节点：" + StringUtils.join(children, ","));
                String zNode = currentNodePath.substring(currentNodePath.lastIndexOf("/") + 1);
                if (zNode.equals(children.get(0))) {
                    //获得锁
                    hasLock = true;
                    break;
                }
                //监听自己前面的节点
                Integer index = children.indexOf(zNode);
                String preNode = children.get(index - 1);
                preNodePath = lockRoot + "/" + preNode;
                synchronized (this) {
                    System.out.println("注册事件:" + preNodePath);
                    client.getData().usingWatcher((Watcher) watchedEvent -> {
                        System.out.println("通知所有线程：" + watchedEvent.getPath());
                        notifyThread();

                    }).forPath(preNodePath);

                    if (timeout > 0 && timeUnit != null) {
                        wait(timeout);
                    } else {
                        wait();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void tryLock() throws Exception {
        this.tryLock(-1, null);
    }

    public void unlock() throws Exception {
        if (client.checkExists().forPath(currentNodePath) != null) {
            client.removeWatchers();
            client.delete().guaranteed().forPath(currentNodePath);

        }
    }

    public synchronized void notifyThread() {
        notifyAll();
    }

    /**
     * 创建 watcher 事件
     */
    @Deprecated
    private void addListener(String path, String node) throws Exception {
        synchronized (this) {
            this.validatePath(path);
            final PathChildrenCache cache = new PathChildrenCache(client, path, false);
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener((client, event) -> {
                System.out.println("删除事件：" + event.getType() + ";" + event.getData().getPath());
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String oldPath = event.getData().getPath();
                    System.out.println("节点删除：" + oldPath);
                    if (oldPath.contains(node)) {
                        notifyThread();
                    }
                }
            });
        }
    }
}
